Skip to main content
Glama

Indigo MCP Server Plugin

by mlamoure
mcp_handler.py26.5 kB
""" MCP Handler for Indigo IWS integration. Implements standards-compliant MCP protocol over Indigo's built-in web server. """ import json import logging import os import secrets import time from typing import Any, Dict, List, Optional, Union from .adapters.data_provider import DataProvider from .common.indigo_device_types import IndigoDeviceType, IndigoEntityType, DeviceTypeResolver from .common.json_encoder import safe_json_dumps from .common.vector_store.vector_store_manager import VectorStoreManager from .handlers.list_handlers import ListHandlers from .resource_registry import get_resource_schemas from .tool_registry import get_tool_schemas from .tool_wrappers import ToolWrappers from .tools.action_control import ActionControlHandler from .tools.device_control import DeviceControlHandler from .tools.get_devices_by_type import GetDevicesByTypeHandler from .tools.historical_analysis import HistoricalAnalysisHandler from .tools.log_query import LogQueryHandler from .tools.plugin_control import PluginControlHandler from .tools.rgb_control import RGBControlHandler from .tools.search_entities import SearchEntitiesHandler from .tools.thermostat_control import ThermostatControlHandler from .tools.variable_control import VariableControlHandler class MCPHandler: """Handles MCP protocol requests through Indigo IWS.""" # MCP Protocol version we support PROTOCOL_VERSION = "2025-06-18" def __init__( self, data_provider: DataProvider, logger: Optional[logging.Logger] = None ): """ Initialize the MCP handler. Args: data_provider: Data provider for accessing entity data logger: Optional logger instance """ self.data_provider = data_provider self.logger = logger or logging.getLogger("Plugin") # Session management self._sessions = {} # session_id -> {created, last_seen, client_info} # Get database path from environment variable db_path = os.environ.get("DB_FILE") if not db_path: raise ValueError("DB_FILE environment variable must be set") # Initialize vector store manager self.vector_store_manager = VectorStoreManager( data_provider=data_provider, db_path=db_path, logger=self.logger, update_interval=300, # 5 minutes ) # Start vector store manager (it will log its own progress) self.vector_store_manager.start() # Initialize handlers self._init_handlers() # Register tools and resources self._tools = {} self._resources = {} self._register_tools() self._register_resources() self.logger.info(f"\t🚀 MCP Server ready ({len(self._tools)} tools, {len(self._resources)} resources)") self.logger.info(f"\t🌐 Endpoint: /message/com.vtmikel.mcp_server/mcp/") def _init_handlers(self): """Initialize all handler instances.""" # Search handler with vector store self.search_handler = SearchEntitiesHandler( data_provider=self.data_provider, vector_store=self.vector_store_manager.get_vector_store(), logger=self.logger, ) # Get devices by type handler self.get_devices_by_type_handler = GetDevicesByTypeHandler( data_provider=self.data_provider, logger=self.logger ) # List handlers for shared logic self.list_handlers = ListHandlers( data_provider=self.data_provider, logger=self.logger ) # Control handlers self.device_control_handler = DeviceControlHandler( data_provider=self.data_provider, logger=self.logger ) self.variable_control_handler = VariableControlHandler( data_provider=self.data_provider, logger=self.logger ) self.action_control_handler = ActionControlHandler( data_provider=self.data_provider, logger=self.logger ) self.rgb_control_handler = RGBControlHandler( data_provider=self.data_provider, logger=self.logger ) self.thermostat_control_handler = ThermostatControlHandler( data_provider=self.data_provider, logger=self.logger ) self.historical_analysis_handler = HistoricalAnalysisHandler( data_provider=self.data_provider, logger=self.logger ) self.log_query_handler = LogQueryHandler( data_provider=self.data_provider, logger=self.logger ) self.plugin_control_handler = PluginControlHandler( data_provider=self.data_provider, logger=self.logger ) # Initialize tool wrappers with all handlers self.tool_wrappers = ToolWrappers( search_handler=self.search_handler, get_devices_by_type_handler=self.get_devices_by_type_handler, device_control_handler=self.device_control_handler, rgb_control_handler=self.rgb_control_handler, thermostat_control_handler=self.thermostat_control_handler, variable_control_handler=self.variable_control_handler, action_control_handler=self.action_control_handler, historical_analysis_handler=self.historical_analysis_handler, list_handlers=self.list_handlers, log_query_handler=self.log_query_handler, plugin_control_handler=self.plugin_control_handler, data_provider=self.data_provider, logger=self.logger ) def stop(self): """Stop the MCP handler and cleanup resources.""" if self.vector_store_manager: self.vector_store_manager.stop() def handle_request( self, method: str, headers: Dict[str, str], body: str ) -> Dict[str, Any]: """ Handle an MCP request from Indigo IWS. Args: method: HTTP method (GET, POST, etc.) headers: Request headers body: Request body as string Returns: Dict with status, headers, and content for IWS response """ # Normalize headers to lowercase headers = {k.lower(): v for k, v in headers.items()} accept = headers.get("accept", "") # Only support POST if method != "POST": return { "status": 405, "headers": {"Allow": "POST"}, "content": "" } # Check Accept header - client must accept json or event-stream if "application/json" not in accept and "text/event-stream" not in accept: self.logger.debug(f"Invalid Accept header: '{accept}'") return {"status": 406, "content": "Not Acceptable"} # Parse JSON body try: payload = json.loads(body) if body else None except Exception as e: self.logger.error(f"Failed to parse JSON body: {e}") return self._json_response( self._json_error(None, -32700, "Parse error"), status=200 ) # Handle empty or invalid payload if not payload: return self._json_response( self._json_error(None, -32600, "Invalid Request"), status=200 ) # MCP 2025-06-18 spec removes support for JSON-RPC batching if isinstance(payload, list): self.logger.debug("Batch requests not supported") return self._json_response( self._json_error(None, -32600, "Batch requests not supported"), status=200 ) # Process single message try: # Single message resp = self._dispatch_message(payload, headers) # If it was a notification (no id), return 200 with empty JSON for IWS compatibility if isinstance(payload, dict) and "id" not in payload: return { "status": 200, "headers": {"Content-Type": "application/json; charset=utf-8"}, "content": "{}" } # Check for session ID in response extra_headers = {} if isinstance(resp, dict) and "_mcp_session_id" in resp: session_id = resp.pop("_mcp_session_id") extra_headers["Mcp-Session-Id"] = session_id return { "status": 200, "headers": { "Content-Type": "application/json; charset=utf-8", **extra_headers }, "content": json.dumps(resp) } except Exception as e: self.logger.exception("Unhandled MCP error") return self._json_response( self._json_error(None, -32603, "Internal error"), status=200 ) def _dispatch_message( self, msg: Dict[str, Any], headers: Dict[str, str] ) -> Optional[Dict[str, Any]]: """ Dispatch a single JSON-RPC message. Args: msg: JSON-RPC message headers: Request headers Returns: JSON-RPC response or None for notifications """ # Validate JSON-RPC structure if not isinstance(msg, dict) or msg.get("jsonrpc") != "2.0" or "method" not in msg: self.logger.debug(f"Invalid JSON-RPC message structure") return self._json_error(msg.get("id"), -32600, "Invalid Request") msg_id = msg.get("id") # May be None for notifications method = msg["method"] params = msg.get("params") or {} # Extract client IP from headers (check common proxy headers first) client_ip = ( headers.get("x-forwarded-for", "").split(",")[0].strip() or headers.get("x-real-ip", "") or headers.get("remote-addr", "") or "unknown" ) # Log incoming request at INFO level (concise) session_id = headers.get("mcp-session-id", "") session_short = session_id[:8] if session_id else "none" # Get client info from session if available client_label = client_ip if session_id and session_id in self._sessions: session_data = self._sessions[session_id] # Try to use client name if available client_info = session_data.get("client_info", {}) client_name = client_info.get("name", "") if client_name: client_label = f"{client_name}@{client_ip}" # Format method for logging if method.startswith("notifications/"): log_method = method.replace("notifications/", "notify:") elif "/" in method: log_method = method.replace("/", ":") else: log_method = method # Only log significant MCP operations at INFO, move protocol to DEBUG significant_methods = ["tools/call", "resources/read"] if any(method.startswith(sm) for sm in significant_methods): self.logger.info(f"📨 {log_method} | {client_label} | session: {session_short}") else: self.logger.debug(f"📨 {log_method} | {client_label} | session: {session_short}") # MCP 2025-06-18 requires MCP-Protocol-Version header for HTTP transport protocol_version_header = headers.get("mcp-protocol-version") if method != "initialize" and not method.startswith("notifications/") and self._sessions: if protocol_version_header and protocol_version_header != self.PROTOCOL_VERSION: self.logger.debug(f"Invalid protocol version: {protocol_version_header}") return self._json_error(msg_id, -32600, f"Unsupported protocol version: {protocol_version_header}") # Session validation (skip for initialize and notifications) session_id = headers.get("mcp-session-id") if method != "initialize" and not method.startswith("notifications/") and self._sessions: if not session_id or session_id not in self._sessions: self.logger.debug(f"Invalid session ID for {method}") return self._json_error(msg_id, -32600, "Missing or invalid Mcp-Session-Id") # Update last seen self._sessions[session_id]["last_seen"] = time.time() # Route to appropriate handler if method == "initialize": return self._handle_initialize(msg_id, params, client_ip) elif method == "ping": return {"jsonrpc": "2.0", "id": msg_id, "result": {}} elif method == "notifications/cancelled": self._handle_cancelled(params) return None elif method == "notifications/initialized": return None # Tool methods elif method == "tools/list": return self._handle_tools_list(msg_id, params) elif method == "tools/call": return self._handle_tools_call(msg_id, params) # Resource methods elif method == "resources/list": return self._handle_resources_list(msg_id, params) elif method == "resources/read": return self._handle_resources_read(msg_id, params) # Prompt methods (stubs for now) elif method == "prompts/list": return { "jsonrpc": "2.0", "id": msg_id, "result": {"prompts": []} } elif method == "prompts/get": return self._json_error(msg_id, -32602, "Unknown prompt") # Unknown method else: if method.startswith("notifications/"): # Unknown notifications ignored gracefully return None else: self.logger.debug(f"Unknown method: {method}") return self._json_error(msg_id, -32601, "Method not found") def _handle_initialize( self, msg_id: Any, params: Dict[str, Any], client_ip: str = "unknown" ) -> Dict[str, Any]: """Handle initialize request.""" requested_version = str(params.get("protocolVersion") or "") client_info = params.get("clientInfo", {}) client_name = client_info.get("name", "Unknown") # Check if we support the requested version if requested_version == self.PROTOCOL_VERSION: # Create new session session_id = secrets.token_urlsafe(24) self._sessions[session_id] = { "created": time.time(), "last_seen": time.time(), "client_info": client_info, "client_ip": client_ip } # Log new session creation with client details self.logger.info(f"🔌 New session: {client_name}@{client_ip} | session: {session_id[:8]}") result = { "jsonrpc": "2.0", "id": msg_id, "result": { "protocolVersion": self.PROTOCOL_VERSION, "capabilities": { "logging": {}, "prompts": {"listChanged": True}, "resources": {"subscribe": False, "listChanged": True}, "tools": {"listChanged": True} }, "serverInfo": { "name": "Indigo MCP Server", "version": "2025.0.1" } } } # Add session ID for header result["_mcp_session_id"] = session_id self.logger.info(f"\t✅ Client initialized: {client_name} | session: {session_id[:8]}") return result else: # Unsupported version self.logger.debug(f"Unsupported protocol version: {requested_version}") return { "jsonrpc": "2.0", "id": msg_id, "error": { "code": -32602, "message": "Unsupported protocol version", "data": { "supported": [self.PROTOCOL_VERSION], "requested": requested_version } } } def _handle_cancelled(self, params: Dict[str, Any]): """Handle cancellation notification.""" # In a synchronous implementation, we can't really cancel ongoing work # This is for async implementations only pass def _handle_tools_list( self, msg_id: Any, params: Dict[str, Any] ) -> Dict[str, Any]: """Handle tools/list request.""" # Convert tool functions to tool descriptions tools = [] for name, info in self._tools.items(): tools.append({ "name": name, "description": info["description"], "inputSchema": info["inputSchema"] }) return { "jsonrpc": "2.0", "id": msg_id, "result": { "tools": tools } } def _handle_tools_call( self, msg_id: Any, params: Dict[str, Any] ) -> Dict[str, Any]: """Handle tools/call request.""" tool_name = params.get("name") tool_args = params.get("arguments", {}) if tool_name not in self._tools: return self._json_error(msg_id, -32602, f"Unknown tool: {tool_name}") try: # Call the tool function result = self._tools[tool_name]["function"](**tool_args) return { "jsonrpc": "2.0", "id": msg_id, "result": { "content": [ { "type": "text", "text": result } ] } } except Exception as e: self.logger.error(f"Tool {tool_name} error: {e}") return self._json_error( msg_id, -32603, f"Tool execution failed: {str(e)}" ) def _handle_resources_list( self, msg_id: Any, params: Dict[str, Any] ) -> Dict[str, Any]: """Handle resources/list request.""" resources = [] for uri, info in self._resources.items(): resources.append({ "uri": uri, "name": info["name"], "description": info["description"], "mimeType": "application/json" }) return { "jsonrpc": "2.0", "id": msg_id, "result": { "resources": resources } } def _handle_resources_read( self, msg_id: Any, params: Dict[str, Any] ) -> Dict[str, Any]: """Handle resources/read request.""" uri = params.get("uri") if not uri: return self._json_error(msg_id, -32602, "Missing uri parameter") # Try exact match first if uri in self._resources: try: content = self._resources[uri]["function"]() return { "jsonrpc": "2.0", "id": msg_id, "result": { "contents": [ { "uri": uri, "mimeType": "application/json", "text": content } ] } } except Exception as e: self.logger.error(f"Resource {uri} error: {e}") return self._json_error( msg_id, -32603, f"Resource read failed: {str(e)}" ) # Try pattern matching for parameterized resources for pattern, info in self._resources.items(): if "{" in pattern: # Has parameters # Simple pattern matching (e.g., "indigo://devices/{id}") base_pattern = pattern.split("{")[0] if uri.startswith(base_pattern): # Extract parameter value param_value = uri[len(base_pattern):] if param_value: try: content = info["function"](param_value) return { "jsonrpc": "2.0", "id": msg_id, "result": { "contents": [ { "uri": uri, "mimeType": "application/json", "text": content } ] } } except Exception as e: self.logger.error(f"Resource {uri} error: {e}") return self._json_error( msg_id, -32603, f"Resource read failed: {str(e)}" ) return self._json_error(msg_id, -32002, f"Resource not found: {uri}") def _register_tools(self): """Register all available tools using extracted tool registry.""" # Create tool functions dictionary mapping tool names to wrapper methods tool_functions = { "search_entities": self.tool_wrappers.tool_search_entities, "get_devices_by_type": self.tool_wrappers.tool_get_devices_by_type, "device_turn_on": self.tool_wrappers.tool_device_turn_on, "device_turn_off": self.tool_wrappers.tool_device_turn_off, "device_set_brightness": self.tool_wrappers.tool_device_set_brightness, "device_set_rgb_color": self.tool_wrappers.tool_device_set_rgb_color, "device_set_rgb_percent": self.tool_wrappers.tool_device_set_rgb_percent, "device_set_hex_color": self.tool_wrappers.tool_device_set_hex_color, "device_set_named_color": self.tool_wrappers.tool_device_set_named_color, "device_set_white_levels": self.tool_wrappers.tool_device_set_white_levels, "thermostat_set_heat_setpoint": self.tool_wrappers.tool_thermostat_set_heat_setpoint, "thermostat_set_cool_setpoint": self.tool_wrappers.tool_thermostat_set_cool_setpoint, "thermostat_set_hvac_mode": self.tool_wrappers.tool_thermostat_set_hvac_mode, "thermostat_set_fan_mode": self.tool_wrappers.tool_thermostat_set_fan_mode, "variable_update": self.tool_wrappers.tool_variable_update, "variable_create": self.tool_wrappers.tool_variable_create, "action_execute_group": self.tool_wrappers.tool_action_execute_group, "analyze_historical_data": self.tool_wrappers.tool_analyze_historical_data, "list_devices": self.tool_wrappers.tool_list_devices, "list_variables": self.tool_wrappers.tool_list_variables, "list_action_groups": self.tool_wrappers.tool_list_action_groups, "list_variable_folders": self.tool_wrappers.tool_list_variable_folders, "get_devices_by_state": self.tool_wrappers.tool_get_devices_by_state, "get_device_by_id": self.tool_wrappers.tool_get_device_by_id, "get_variable_by_id": self.tool_wrappers.tool_get_variable_by_id, "get_action_group_by_id": self.tool_wrappers.tool_get_action_group_by_id, "query_event_log": self.tool_wrappers.tool_query_event_log, "list_plugins": self.tool_wrappers.tool_list_plugins, "get_plugin_by_id": self.tool_wrappers.tool_get_plugin_by_id, "restart_plugin": self.tool_wrappers.tool_restart_plugin, "get_plugin_status": self.tool_wrappers.tool_get_plugin_status, } # Get tool schemas from registry self._tools = get_tool_schemas(tool_functions) def _register_resources(self): """Register all available resources using extracted resource registry.""" # Create resource functions dictionary mapping resource names to wrapper methods resource_functions = { "list_devices": self.tool_wrappers.resource_list_devices, "get_device": self.tool_wrappers.resource_get_device, "list_variables": self.tool_wrappers.resource_list_variables, "get_variable": self.tool_wrappers.resource_get_variable, "list_actions": self.tool_wrappers.resource_list_actions, "get_action": self.tool_wrappers.resource_get_action, } # Get resource schemas from registry self._resources = get_resource_schemas(resource_functions) def _json_response(self, obj: Any, status: int = 200) -> Dict[str, Any]: """Create JSON response for IWS.""" return { "status": status, "headers": {"Content-Type": "application/json; charset=utf-8"}, "content": json.dumps(obj) } def _json_error( self, msg_id: Any, code: int, message: str, data: Any = None ) -> Dict[str, Any]: """Create JSON-RPC error response.""" error = { "jsonrpc": "2.0", "error": { "code": code, "message": message } } if data is not None: error["error"]["data"] = data if msg_id is not None: error["id"] = msg_id return error

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/mlamoure/indigo-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server